package spark.example

import java.net.URI
import java.text.SimpleDateFormat
import java.util.Date

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SaveMode.Append
import org.apache.spark.{SparkConf, SparkContext}

/**
  * read parquet data from hdfs & print some fields. 一天执行一次
  */
object ReadParquetData {
  def main(args: Array[String]): Unit = {
    val path = new Path("hdfs://cdhmanager:8020//tmp/sparktesttow")
    val conf = new SparkConf().setAppName("test").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val parquet: DataFrame =
      sqlContext.read.parquet(path.toString)
    parquet.printSchema()
    //    parquet.select(parquet("msgType"), parquet("did"),parquet("gid")).show()
    val df = parquet.select(parquet("value"))
    val format = new SimpleDateFormat("yyyy-MM-dd")
    val date = new Date(System.currentTimeMillis())
    val stringDate = format.format(date)
    df.coalesce(1).write.mode(Append).parquet("hdfs://cdhmanager:8020/tmp/spark"+stringDate)
//    val hadoopConf = sc.hadoopConfiguration
//    val hdfs = FileSystem.get(hadoopConf)
    val hdfs = FileSystem.get(new URI("hdfs://cdhmanager:8020"),new Configuration())
    if (hdfs.exists(path))
      {
        hdfs.delete(path,true)
      }
//    parquet.select(parquet("value")).rdd.foreach(row => {
//      for (i <- 0 until row.size) {
//          println(row.get(i))
//      }
//    })
  }
}
